/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.connector.jdbc;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
import org.apache.flink.types.Row;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;

import static org.apache.flink.connector.jdbc.JdbcTestFixture.INPUT_TABLE;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.INSERT_TEMPLATE;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE_2;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE_3;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.SELECT_ALL_NEWBOOKS;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.SELECT_ALL_NEWBOOKS_2;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.SELECT_ALL_NEWBOOKS_3;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for the {@link JdbcRowOutputFormat}. */
class JdbcRowOutputFormatTest extends JdbcDataTestBase {

    private JdbcRowOutputFormat jdbcOutputFormat;

    @AfterEach
    void tearDown() throws Exception {
        if (jdbcOutputFormat != null) {
            jdbcOutputFormat.close();
        }
        jdbcOutputFormat = null;

        try (Connection conn = getMetadata().getConnection();
                Statement stat = conn.createStatement()) {
            stat.execute("DELETE FROM " + OUTPUT_TABLE);
        }
    }

    @Test
    void testInvalidDriver() {
        String expectedMsg = "unable to open JDBC writer";
        try {
            jdbcOutputFormat =
                    JdbcRowOutputFormat.buildJdbcOutputFormat()
                            .setDrivername("org.apache.derby.jdbc.idontexist")
                            .setDBUrl(getMetadata().getJdbcUrl())
                            .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
                            .finish();

            JdbcOutputSerializer<Row> serializer =
                    JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), false));

            jdbcOutputFormat.open(serializer);
        } catch (Exception e) {
            assertThat(findThrowable(e, IOException.class)).isPresent();
            assertThat(findThrowableWithMessage(e, expectedMsg)).isPresent();
        }
    }

    @Test
    void testInvalidURL() {
        String expectedMsg = "No suitable driver found for jdbc:der:iamanerror:mory:ebookshop";

        jdbcOutputFormat =
                JdbcRowOutputFormat.buildJdbcOutputFormat()
                        .setDrivername(getMetadata().getDriverClass())
                        .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
                        .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
                        .finish();
        JdbcOutputSerializer<Row> serializer =
                JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), false));
        assertThatThrownBy(() -> jdbcOutputFormat.open(serializer))
                .isInstanceOf(IOException.class)
                .satisfies(anyCauseMatches(SQLException.class, expectedMsg));
    }

    @Test
    void testInvalidQuery() {
        String expectedMsg = "unable to open JDBC writer";
        try {
            jdbcOutputFormat =
                    JdbcRowOutputFormat.buildJdbcOutputFormat()
                            .setDrivername(getMetadata().getDriverClass())
                            .setDBUrl(getMetadata().getJdbcUrl())
                            .setQuery("iamnotsql")
                            .finish();
            JdbcOutputSerializer<Row> serializer =
                    JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true));
            jdbcOutputFormat.open(serializer);
        } catch (Exception e) {
            assertThat(findThrowable(e, IOException.class)).isPresent();
            assertThat(findThrowableWithMessage(e, expectedMsg)).isPresent();
        }
    }

    @Test
    void testIncompleteConfiguration() {
        String expectedMsg = "jdbc url is empty";
        try {
            jdbcOutputFormat =
                    JdbcRowOutputFormat.buildJdbcOutputFormat()
                            .setDrivername(getMetadata().getDriverClass())
                            .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
                            .finish();
        } catch (Exception e) {
            assertThat(findThrowable(e, NullPointerException.class)).isPresent();
            assertThat(findThrowableWithMessage(e, expectedMsg)).isPresent();
        }
    }

    @Test
    void testIncompatibleTypes() {
        String expectedMsg = "Invalid character string format for type INTEGER.";
        try {
            jdbcOutputFormat =
                    JdbcRowOutputFormat.buildJdbcOutputFormat()
                            .setDrivername(getMetadata().getDriverClass())
                            .setDBUrl(getMetadata().getJdbcUrl())
                            .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
                            .finish();
            JdbcOutputSerializer<Row> serializer =
                    JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true));
            jdbcOutputFormat.open(serializer);

            Row row = new Row(5);
            row.setField(0, 4);
            row.setField(1, "hello");
            row.setField(2, "world");
            row.setField(3, 0.99);
            row.setField(4, "imthewrongtype");

            jdbcOutputFormat.writeRecord(row);
            jdbcOutputFormat.close();
        } catch (Exception e) {
            assertThat(findThrowable(e, SQLDataException.class)).isPresent();
            assertThat(findThrowableWithMessage(e, expectedMsg)).isPresent();
        }
    }

    @Test
    void testExceptionOnInvalidType() {
        String expectedMsg = "field index: 3, field value: 0.";
        try {
            jdbcOutputFormat =
                    JdbcRowOutputFormat.buildJdbcOutputFormat()
                            .setDrivername(getMetadata().getDriverClass())
                            .setDBUrl(getMetadata().getJdbcUrl())
                            .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
                            .setSqlTypes(
                                    new int[] {
                                        Types.INTEGER,
                                        Types.VARCHAR,
                                        Types.VARCHAR,
                                        Types.DOUBLE,
                                        Types.INTEGER
                                    })
                            .finish();
            JdbcOutputSerializer<Row> serializer =
                    JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true));
            jdbcOutputFormat.open(serializer);

            TestEntry entry = TEST_DATA[0];
            Row row = new Row(5);
            row.setField(0, entry.id);
            row.setField(1, entry.title);
            row.setField(2, entry.author);
            row.setField(3, 0L); // use incompatible type (Long instead of Double)
            row.setField(4, entry.qty);
            jdbcOutputFormat.writeRecord(row);
            jdbcOutputFormat.close();
        } catch (Exception e) {
            assertThat(findThrowable(e, ClassCastException.class)).isPresent();
            assertThat(findThrowableWithMessage(e, expectedMsg)).isPresent();
        }
    }

    @Test
    void testExceptionOnClose() {
        String expectedMsg = "Writing records to JDBC failed.";
        try {
            jdbcOutputFormat =
                    JdbcRowOutputFormat.buildJdbcOutputFormat()
                            .setDrivername(getMetadata().getDriverClass())
                            .setDBUrl(getMetadata().getJdbcUrl())
                            .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
                            .setSqlTypes(
                                    new int[] {
                                        Types.INTEGER,
                                        Types.VARCHAR,
                                        Types.VARCHAR,
                                        Types.DOUBLE,
                                        Types.INTEGER
                                    })
                            .finish();
            JdbcOutputSerializer<Row> serializer =
                    JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true));
            jdbcOutputFormat.open(serializer);

            TestEntry entry = TEST_DATA[0];
            Row row = new Row(5);
            row.setField(0, entry.id);
            row.setField(1, entry.title);
            row.setField(2, entry.author);
            row.setField(3, entry.price);
            row.setField(4, entry.qty);
            jdbcOutputFormat.writeRecord(row);
            jdbcOutputFormat.writeRecord(
                    row); // writing the same record twice must yield a unique key violation.

            jdbcOutputFormat.close();
        } catch (Exception e) {
            assertThat(findThrowable(e, RuntimeException.class)).isPresent();
            assertThat(findThrowableWithMessage(e, expectedMsg)).isPresent();
        }
    }

    @Test
    void testJdbcOutputFormat() throws IOException, SQLException {
        jdbcOutputFormat =
                JdbcRowOutputFormat.buildJdbcOutputFormat()
                        .setDrivername(getMetadata().getDriverClass())
                        .setDBUrl(getMetadata().getJdbcUrl())
                        .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
                        .finish();
        JdbcOutputSerializer<Row> serializer =
                JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true));
        jdbcOutputFormat.open(serializer);

        for (TestEntry entry : TEST_DATA) {
            jdbcOutputFormat.writeRecord(toRow(entry));
        }

        jdbcOutputFormat.close();

        try (Connection dbConn = DriverManager.getConnection(getMetadata().getJdbcUrl());
                PreparedStatement statement = dbConn.prepareStatement(SELECT_ALL_NEWBOOKS);
                ResultSet resultSet = statement.executeQuery()) {
            int recordCount = 0;
            while (resultSet.next()) {
                assertThat(resultSet.getObject("id")).isEqualTo(TEST_DATA[recordCount].id);
                assertThat(resultSet.getObject("title")).isEqualTo(TEST_DATA[recordCount].title);
                assertThat(resultSet.getObject("author")).isEqualTo(TEST_DATA[recordCount].author);
                assertThat(resultSet.getObject("price")).isEqualTo(TEST_DATA[recordCount].price);
                assertThat(resultSet.getObject("qty")).isEqualTo(TEST_DATA[recordCount].qty);

                recordCount++;
            }
            assertThat(recordCount).isEqualTo(TEST_DATA.length);
        }
    }

    @Test
    void testFlush() throws SQLException, IOException {
        jdbcOutputFormat =
                JdbcRowOutputFormat.buildJdbcOutputFormat()
                        .setDrivername(getMetadata().getDriverClass())
                        .setDBUrl(getMetadata().getJdbcUrl())
                        .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE_2))
                        .setBatchSize(3)
                        .finish();

        JdbcOutputSerializer<Row> serializer =
                JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true));
        try (Connection dbConn = getMetadata().getConnection();
                PreparedStatement statement = dbConn.prepareStatement(SELECT_ALL_NEWBOOKS_2)) {
            jdbcOutputFormat.open(serializer);
            for (int i = 0; i < 2; ++i) {
                jdbcOutputFormat.writeRecord(toRow(TEST_DATA[i]));
            }
            try (ResultSet resultSet = statement.executeQuery()) {
                assertThat(resultSet.next()).isFalse();
            }
            jdbcOutputFormat.writeRecord(toRow(TEST_DATA[2]));
            try (ResultSet resultSet = statement.executeQuery()) {
                int recordCount = 0;
                while (resultSet.next()) {
                    assertThat(resultSet.getObject("id")).isEqualTo(TEST_DATA[recordCount].id);
                    assertThat(resultSet.getObject("title"))
                            .isEqualTo(TEST_DATA[recordCount].title);
                    assertThat(resultSet.getObject("author"))
                            .isEqualTo(TEST_DATA[recordCount].author);
                    assertThat(resultSet.getObject("price"))
                            .isEqualTo(TEST_DATA[recordCount].price);
                    assertThat(resultSet.getObject("qty")).isEqualTo(TEST_DATA[recordCount].qty);
                    recordCount++;
                }
                assertThat(recordCount).isEqualTo(3);
            }
        } finally {
            jdbcOutputFormat.close();
        }
    }

    @Test
    void testInvalidConnectionInJdbcOutputFormat() throws IOException, SQLException {
        jdbcOutputFormat =
                JdbcRowOutputFormat.buildJdbcOutputFormat()
                        .setDrivername(getMetadata().getDriverClass())
                        .setDBUrl(getMetadata().getJdbcUrl())
                        .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE_3))
                        .finish();
        JdbcOutputSerializer<Row> serializer =
                JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true));
        jdbcOutputFormat.open(serializer);

        // write records
        for (int i = 0; i < 3; i++) {
            jdbcOutputFormat.writeRecord(toRow(TEST_DATA[i]));
        }

        // close connection
        jdbcOutputFormat.getConnection().close();

        for (int i = 3; i < TEST_DATA.length; i++) {
            jdbcOutputFormat.writeRecord(toRow(TEST_DATA[i]));
        }

        jdbcOutputFormat.close();

        try (Connection dbConn = DriverManager.getConnection(getMetadata().getJdbcUrl());
                PreparedStatement statement = dbConn.prepareStatement(SELECT_ALL_NEWBOOKS_3);
                ResultSet resultSet = statement.executeQuery()) {
            int recordCount = 0;
            while (resultSet.next()) {
                assertThat(resultSet.getObject("id")).isEqualTo(TEST_DATA[recordCount].id);
                assertThat(resultSet.getObject("title")).isEqualTo(TEST_DATA[recordCount].title);
                assertThat(resultSet.getObject("author")).isEqualTo(TEST_DATA[recordCount].author);
                assertThat(resultSet.getObject("price")).isEqualTo(TEST_DATA[recordCount].price);
                assertThat(resultSet.getObject("qty")).isEqualTo(TEST_DATA[recordCount].qty);

                recordCount++;
            }
            assertThat(recordCount).isEqualTo(TEST_DATA.length);
        }
    }
}
